package com.nx.platform.es.biz.esspider.process;


import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.MoreExecutors;
import com.nx.platform.es.biz.esspider.recycle.Recycler;
import com.nx.platform.es.biz.esspider.sink.Sinks;
import com.nx.platform.es.system.config.ConfigCenter;
import com.nx.platform.es.biz.esspider.entity.Item;
import com.nx.platform.es.biz.esspider.entity.Message;
import com.nx.platform.es.biz.esspider.handler.Handlers;
import com.nx.platform.es.common.utils.IdSharding;
import com.nx.platform.es.common.utils.MoreMaps;
import com.nx.platform.es.common.utils.MoreThreadFactorys;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.common.Booleans;
import org.slf4j.MDC;

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import static com.nx.platform.es.common.utils.Constants.*;

/**
 * 处理服务，用于管理处理的线程，handler 的入口
 * @author
 * @since 2017年3月25日
 */
@Slf4j
public class ProcessorServiceImpl extends AbstractIdleService implements ProcessorService {

    private final String code;

    private final Handlers handlers;
    private final Sinks sinks;
    private final Recycler recycler;
    private final String debugSwitchConfigKey;

    private final int flushSizeLimit;
    private final long flushTimeLimit;
    private final IdSharding processIdSharding;
    private final int processBulkSize;
    private final int poolSize;

    private BlockingQueue<Message> queue;
    private AtomicLong lastFlushTime;
    private ExecutorService executor;

    public ProcessorServiceImpl(
            String code, Map<?, ?> settings,
            Handlers handlerRouter, Sinks sinkRouter,
            Recycler recycler,
            String debugSwitchConfigKey) {
        this.code = code;
        this.recycler = recycler;
        this.handlers = handlerRouter;
        this.sinks = sinkRouter;
        this.debugSwitchConfigKey = Strings.nullToEmpty(debugSwitchConfigKey) + "." + code;
        this.flushTimeLimit = MoreMaps.getLongValue(settings, FLUSH_TIME_LIMIT, FLUSH_TIME_LIMIT_DEFAULT);
        this.flushSizeLimit = MoreMaps.getIntValue(settings, FLUSH_SIZE_LIMIT, FLUSH_SIZE_LIMIT_DEFAULT);
        this.poolSize = MoreMaps.getIntValue(settings, POOL_SIZE, POOL_SIZE_DEFAULT);
        this.processIdSharding = IdSharding.get(MoreMaps.getString(settings, PROCESS_ID_SHARDING));
        this.processBulkSize = MoreMaps.getIntValue(settings, PROCESS_BULK_SIZE, PROCESS_BULK_SIZE_DEFAULT);
    }

    private boolean isDebugSwitchOpen() {
        return (Boolean) ConfigCenter.getConfig(debugSwitchConfigKey,
                s -> Booleans.parseBoolean(s, false)).orElse(false);
    }

    @Override
    protected void startUp() throws Exception {

        // validate config
        Preconditions.checkArgument(flushTimeLimit >= FLUSH_TIME_LIMIT_MIN);
        Preconditions.checkArgument(flushTimeLimit <= FLUSH_TIME_LIMIT_MAX);
        Preconditions.checkArgument(flushSizeLimit >= FLUSH_SIZE_LIMIT_MIN);
        Preconditions.checkArgument(flushSizeLimit <= FLUSH_SIZE_LIMIT_MAX);
        Preconditions.checkArgument(processBulkSize >= PROCESS_BULK_SIZE_MIN);
        Preconditions.checkArgument(processBulkSize <= PROCESS_BULK_SIZE_MAX);
        Preconditions.checkArgument(poolSize >= POOL_SIZE_MIN);
        Preconditions.checkArgument(poolSize <= POOL_SIZE_MAX);

        // init query and last flush time
        queue = new LinkedBlockingQueue<>(flushSizeLimit * 2);
        lastFlushTime = new AtomicLong(System.currentTimeMillis());

        // int auto flush 排队，积累线程处理方案
        Executors.newSingleThreadScheduledExecutor(MoreThreadFactorys
                .newSingleThreadNameableDaemonThreadFactory("FlushNotifyer(" + code + ")"))
                .scheduleWithFixedDelay(this::flushIfNecessary,
                        flushTimeLimit, flushTimeLimit, TimeUnit.MILLISECONDS);

        // init handler executor
        executor = new ThreadPoolExecutor(poolSize, poolSize,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>(),
                MoreThreadFactorys.newNameableThreadFactory("Processor(" + code + ")"),
                new ThreadPoolExecutor.CallerRunsPolicy());

    }

    @Override
    protected void shutDown() {
        flush();
        MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.HOURS);
    }

    @Override
    public void deliver(Message message) throws Exception {
        queue.put(message);
        flushIfNecessary();
    }

    /**
     * 判断批量是否达到
     */
    private void flushIfNecessary() {
        if (isNecessary()) {
            synchronized (this) {
                if (isNecessary()) {
                    flush();
                }
            }
        }
    }

    private boolean isNecessary() {
        return !queue.isEmpty() &&
                (queue.size() >= flushSizeLimit ||
                        System.currentTimeMillis() - lastFlushTime.get() > flushTimeLimit);
    }

    private synchronized void flush() {
        // 从队列中批量取消息，批量去重压缩
        lastFlushTime.set(System.currentTimeMillis());
        int amount = Math.min(queue.size(), flushSizeLimit);
        Multimap<Long, Message> msgs = LinkedListMultimap.create(amount);
        Message msg = queue.poll();
        while (msg != null && amount > 0) {
            msgs.put(msg.getId(), msg);
            msg = queue.poll();
            amount--;
        }
        if (msgs.isEmpty()) {
            return;
        }
        // sharding后处理， 先分组，后进行批量处理
        for (Collection<Long> bulk : processIdSharding.shardingAndPartition(msgs.keySet(), processBulkSize)) {
            Map<Long, Item> items = new LinkedHashMap<>(bulk.size());
            for (Long id : bulk) {
                Collection<Message> mc = msgs.get(id);
                List<String> sources = new ArrayList<>(mc.size());   //表示mq，或者import
                List<LocalDateTime> timestamps = new ArrayList<>(mc.size());
                List<String> lines = new ArrayList<>(mc.size());    //原始消息
                Map<String, Object> doc = new LinkedHashMap<>();
                mc.forEach(e -> {
                    sources.add(e.getSource());
                    timestamps.add(e.getTimestamp());
                    lines.add(e.getLine());
                    if (e.getDoc() != null) {
                        doc.putAll(e.getDoc());
                    }
                });
                Item item = new Item();
                item.setSource(sources);
                item.setTimestamp(timestamps);
                item.setLines(lines);
                item.setType(code);
                item.setId(id);
                item.setDoc(doc);
                items.put(id, item);
            }
            // 批量处理
            executor.execute(() -> process(items));
        }
    }

    private void process(Map<Long, Item> items) {
        MDC.put("logStr", code +"_" + items.size()+ "_" + System.currentTimeMillis());
        log.info("Processing ({}).", items.size());
        try {
            //1. 执行所有的handler check
            handlers.check(code);
            //2. 执行ES check
            sinks.check(code);
            //3. 执行handler过滤操作
            handlers.filter(code, items);
            //4. 执行ES刷盘操作
            Map<Long, Item> failedItems = sinks.accept(code, items);
            if (!failedItems.isEmpty()) {
                log.info("Processed ({}/{}), recycle failed items.",
                        items.size() - failedItems.size(), items.size());
                log.error("desc=process_error action=add_redo item={}", failedItems.entrySet().iterator().next());
                recycler.recycle(failedItems);
            } else {
                log.info("Processed ({}/{}).", items.size(), items.size());
            }
        } catch (Exception e) {
            log.error("Processed (0/{}), recycle {}", items.size(), e.getMessage());
            // 5. 死信队列重试
            recycler.recycle(items);
        } finally {
            if (log.isDebugEnabled() || isDebugSwitchOpen()) {
                String format = "Processed\n{\n\t" +
                        "@timestamp => {}\n\t@source => {}\n\t" +
                        "@lines => {}\n\t@op => {}\n\t@type => {}\n\t@id => {}\n\t" +
                        "@doc => {}\n\t@effectDocs => {}\n\t@results => {}\n}";
                items.values().forEach((Item item) ->
                        log.info(format, item.getTimestamp(), item.getSource(),
                                item.getLines(), item.getOpType(), item.getType(), item.getId(),
                                item.getDoc(), item.getEffectDocs(), item.getResults())
                );
            }
            MDC.remove("logStr");
        }
    }

    @Override
    public String getCode() {
        return code;
    }

    @Override
    protected String serviceName() {
        return toString();
    }

    @Override
    public String toString() {
        return "ProcessorService(" + code + ")";
    }

}
